---
toc_max_heading_level: 4
sidebar_position: 4
sidebar_label: Go
title: TDengine Go Connector
---

import Tabs from '@theme/Tabs';
import TabItem from '@theme/TabItem';

import Preparation from "./_preparation.mdx"
import GoInsert from "../07-develop/03-insert-data/_go_sql.mdx"
import GoInfluxLine from "../07-develop/03-insert-data/_go_line.mdx"
import GoOpenTSDBTelnet from "../07-develop/03-insert-data/_go_opts_telnet.mdx"
import GoOpenTSDBJson from "../07-develop/03-insert-data/_go_opts_json.mdx"
import GoQuery from "../07-develop/04-query-data/_go.mdx"

`driver-go` 是 TDengine 的官方 Go 语言连接器，实现了 Go 语言 [database/sql](https://golang.org/pkg/database/sql/)  包的接口。Go 开发人员可以通过它开发存取 TDengine 集群数据的应用软件。

`driver-go` 提供两种建立连接的方式。一种是**原生连接**，它通过 TDengine 客户端驱动程序（taosc）原生连接 TDengine 运行实例，支持数据写入、查询、订阅、schemaless 接口和参数绑定接口等功能。另外一种是 **REST 连接**，它通过 taosAdapter 提供的 REST 接口连接 TDengine 运行实例。REST 连接实现的功能特性集合和原生连接有少量不同。

本文介绍如何安装 `driver-go`，并通过 `driver-go` 连接 TDengine 集群、进行数据查询、数据写入等基本操作。

`driver-go` 的源码托管在 [GitHub](https://github.com/taosdata/driver-go)。

## 支持的平台

原生连接支持的平台和 TDengine 客户端驱动支持的平台一致。
REST 连接支持所有能运行 Go 的平台。

## 版本支持

请参考[版本支持列表](https://github.com/taosdata/driver-go#remind)

## 处理异常

如果是 TDengine 错误可以通过以下方式获取错误码和错误信息。

```go
// import "github.com/taosdata/driver-go/v3/errors"
    if err != nil {
        tError, is := err.(*errors.TaosError)
        if is {
            fmt.Println("errorCode:", int(tError.Code))
            fmt.Println("errorMessage:", tError.ErrStr)
        } else {
            fmt.Println(err.Error())
        }
    }
```

## TDengine DataType 和 Go DataType

| TDengine DataType | Go Type   |
|-------------------|-----------|
| TIMESTAMP         | time.Time |
| TINYINT           | int8      |
| SMALLINT          | int16     |
| INT               | int32     |
| BIGINT            | int64     |
| TINYINT UNSIGNED  | uint8     |
| SMALLINT UNSIGNED | uint16    |
| INT UNSIGNED      | uint32    |
| BIGINT UNSIGNED   | uint64    |
| FLOAT             | float32   |
| DOUBLE            | float64   |
| BOOL              | bool      |
| BINARY            | string    |
| NCHAR             | string    |
| JSON              | []byte    |

**注意**：JSON 类型仅在 tag 中支持。

## 安装步骤

### 安装前准备

* 安装 Go 开发环境（Go 1.14 及以上，GCC 4.8.5 及以上）
* 如果使用原生连接器，请安装 TDengine 客户端驱动，具体步骤请参考[安装客户端驱动](../#安装客户端驱动)

配置好环境变量，检查命令：

* ```go env```
* ```gcc -v```

### 安装连接器

1. 使用 `go mod` 命令初始化项目：

   ```text
   go mod init taos-demo
   ```

2. 引入 taosSql ：

   ```go
   import (
     "database/sql"
     _ "github.com/taosdata/driver-go/v3/taosSql"
   )
   ```

3. 使用 `go mod tidy` 更新依赖包：

   ```text
   go mod tidy
   ```

4. 使用 `go run taos-demo` 运行程序或使用 `go build` 命令编译出二进制文件。

  ```text
  go run taos-demo
  go build
  ```

## 建立连接

数据源名称具有通用格式，例如 [PEAR DB](http://pear.php.net/manual/en/package.database.db.intro-dsn.php)，但没有类型前缀（方括号表示可选）：

``` text
[username[:password]@][protocol[(address)]]/[dbname][?param1=value1&...&paramN=valueN]
```

完整形式的 DSN：

```text
username:password@protocol(address)/dbname?param=value
```

<Tabs defaultValue="rest" groupId="connect">
<TabItem value="native" label="原生连接">

_taosSql_ 通过 cgo 实现了 Go 的 `database/sql/driver` 接口。只需要引入驱动就可以使用 [`database/sql`](https://golang.org/pkg/database/sql/) 的接口。

使用 `taosSql` 作为 `driverName` 并且使用一个正确的 [DSN](#DSN) 作为 `dataSourceName`，DSN 支持的参数：

* cfg 指定 taos.cfg 目录

示例：

```go
package main

import (
    "database/sql"
    "fmt"

    _ "github.com/taosdata/driver-go/v3/taosSql"
)

func main() {
    var taosUri = "root:taosdata@tcp(localhost:6030)/"
    taos, err := sql.Open("taosSql", taosUri)
    if err != nil {
        fmt.Println("failed to connect TDengine, err:", err)
        return
    }
}
```

</TabItem>
<TabItem value="rest" label="REST 连接">

_taosRestful_ 通过 `http client` 实现了 Go 的 `database/sql/driver` 接口。只需要引入驱动就可以使用[`database/sql`](https://golang.org/pkg/database/sql/)的接口。

使用 `taosRestful` 作为 `driverName` 并且使用一个正确的 [DSN](#DSN) 作为 `dataSourceName`，DSN 支持的参数：

* `disableCompression` 是否接受压缩数据，默认为 true 不接受压缩数据，如果传输数据使用 gzip 压缩设置为 false。
* `readBufferSize` 读取数据的缓存区大小默认为 4K（4096），当查询结果数据量多时可以适当调大该值。

示例：

```go
package main

import (
    "database/sql"
    "fmt"

    _ "github.com/taosdata/driver-go/v3/taosRestful"
)

func main() {
    var taosUri = "root:taosdata@http(localhost:6041)/"
    taos, err := sql.Open("taosRestful", taosUri)
    if err != nil {
        fmt.Println("failed to connect TDengine, err:", err)
        return
    }
}
```

</TabItem>
<TabItem value="WebSocket" label="WebSocket 连接">

_taosWS_ 通过 `WebSocket` 实现了 Go 的 `database/sql/driver` 接口。只需要引入驱动（driver-go 最低版本 3.0.2）就可以使用[`database/sql`](https://golang.org/pkg/database/sql/)的接口。

使用 `taosWS` 作为 `driverName` 并且使用一个正确的 [DSN](#DSN) 作为 `dataSourceName`，DSN 支持的参数：

* `writeTimeout` 通过 WebSocket 发送数据的超时时间。
* `readTimeout` 通过 WebSocket 接收响应数据的超时时间。

示例：

```go
package main

import (
    "database/sql"
    "fmt"

    _ "github.com/taosdata/driver-go/v3/taosWS"
)

func main() {
    var taosUri = "root:taosdata@ws(localhost:6041)/"
    taos, err := sql.Open("taosWS", taosUri)
    if err != nil {
        fmt.Println("failed to connect TDengine, err:", err)
        return
    }
}
```

</TabItem>
</Tabs>

### 指定 URL 和 Properties 获取连接

Go 连接器不支持此功能

### 配置参数的优先级

Go 连接器不支持此功能

## 使用示例

### 创建数据库和表

```go
var taosDSN = "root:taosdata@tcp(localhost:6030)/"
taos, err := sql.Open("taosSql", taosDSN)
if err != nil {
    log.Fatalln("failed to connect TDengine, err:", err)
}
defer taos.Close()
_, err := taos.Exec("CREATE DATABASE power")
if err != nil {
    log.Fatalln("failed to create database, err:", err)
}
_, err = taos.Exec("CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)")
if err != nil {
    log.Fatalln("failed to create stable, err:", err)
}
```

### 插入数据

<GoInsert />

### 查询数据

<GoQuery />

### 执行带有 reqId 的 SQL

此 reqId 可用于请求链路追踪。

```go
db, err := sql.Open("taosSql", "root:taosdata@tcp(localhost:6030)/")
if err != nil {
    panic(err)
}
defer db.Close()
ctx := context.WithValue(context.Background(), common.ReqIDKey, common.GetReqID())
_, err = db.ExecContext(ctx, "create database if not exists example_taos_sql")
if err != nil {
    panic(err)
}
```

### 通过参数绑定写入数据

<Tabs defaultValue="native" groupId="connect">
<TabItem value="native" label="原生连接">

```go
package main

import (
    "time"

    "github.com/taosdata/driver-go/v3/af"
    "github.com/taosdata/driver-go/v3/common"
    "github.com/taosdata/driver-go/v3/common/param"
)

func main() {
    db, err := af.Open("", "root", "taosdata", "", 0)
    if err != nil {
        panic(err)
    }
    defer db.Close()
    _, err = db.Exec("create database if not exists example_stmt")
    if err != nil {
        panic(err)
    }
    _, err = db.Exec("create table if not exists example_stmt.tb1(ts timestamp," +
        "c1 bool," +
        "c2 tinyint," +
        "c3 smallint," +
        "c4 int," +
        "c5 bigint," +
        "c6 tinyint unsigned," +
        "c7 smallint unsigned," +
        "c8 int unsigned," +
        "c9 bigint unsigned," +
        "c10 float," +
        "c11 double," +
        "c12 binary(20)," +
        "c13 nchar(20)" +
        ")")
    if err != nil {
        panic(err)
    }
    stmt := db.InsertStmt()
    err = stmt.Prepare("insert into example_stmt.tb1 values(?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
    if err != nil {
        panic(err)
    }
    now := time.Now()
    params := make([]*param.Param, 14)
    params[0] = param.NewParam(2).
        AddTimestamp(now, common.PrecisionMilliSecond).
        AddTimestamp(now.Add(time.Second), common.PrecisionMilliSecond)
    params[1] = param.NewParam(2).AddBool(true).AddNull()
    params[2] = param.NewParam(2).AddTinyint(2).AddNull()
    params[3] = param.NewParam(2).AddSmallint(3).AddNull()
    params[4] = param.NewParam(2).AddInt(4).AddNull()
    params[5] = param.NewParam(2).AddBigint(5).AddNull()
    params[6] = param.NewParam(2).AddUTinyint(6).AddNull()
    params[7] = param.NewParam(2).AddUSmallint(7).AddNull()
    params[8] = param.NewParam(2).AddUInt(8).AddNull()
    params[9] = param.NewParam(2).AddUBigint(9).AddNull()
    params[10] = param.NewParam(2).AddFloat(10).AddNull()
    params[11] = param.NewParam(2).AddDouble(11).AddNull()
    params[12] = param.NewParam(2).AddBinary([]byte("binary")).AddNull()
    params[13] = param.NewParam(2).AddNchar("nchar").AddNull()

    paramTypes := param.NewColumnType(14).
        AddTimestamp().
        AddBool().
        AddTinyint().
        AddSmallint().
        AddInt().
        AddBigint().
        AddUTinyint().
        AddUSmallint().
        AddUInt().
        AddUBigint().
        AddFloat().
        AddDouble().
        AddBinary(6).
        AddNchar(5)
    err = stmt.BindParam(params, paramTypes)
    if err != nil {
        panic(err)
    }
    err = stmt.AddBatch()
    if err != nil {
        panic(err)
    }
    err = stmt.Execute()
    if err != nil {
        panic(err)
    }
    err = stmt.Close()
    if err != nil {
        panic(err)
    }
    // select * from example_stmt.tb1
}
```

</TabItem>
<TabItem value="WebSocket" label="WebSocket 连接">

```go
package main

import (
    "database/sql"
    "fmt"
    "time"

    "github.com/taosdata/driver-go/v3/common"
    "github.com/taosdata/driver-go/v3/common/param"
    _ "github.com/taosdata/driver-go/v3/taosRestful"
    "github.com/taosdata/driver-go/v3/ws/stmt"
)

func main() {
    db, err := sql.Open("taosRestful", "root:taosdata@http(localhost:6041)/")
    if err != nil {
        panic(err)
    }
    defer db.Close()
    prepareEnv(db)

    config := stmt.NewConfig("ws://127.0.0.1:6041/rest/stmt", 0)
    config.SetConnectUser("root")
    config.SetConnectPass("taosdata")
    config.SetConnectDB("example_ws_stmt")
    config.SetMessageTimeout(common.DefaultMessageTimeout)
    config.SetWriteWait(common.DefaultWriteWait)
    config.SetErrorHandler(func(connector *stmt.Connector, err error) {
        panic(err)
    })
    config.SetCloseHandler(func() {
        fmt.Println("stmt connector closed")
    })

    connector, err := stmt.NewConnector(config)
    if err != nil {
        panic(err)
    }
    now := time.Now()
    {
        stmt, err := connector.Init()
        if err != nil {
            panic(err)
        }
        err = stmt.Prepare("insert into ? using all_json tags(?) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
        if err != nil {
            panic(err)
        }
        err = stmt.SetTableName("tb1")
        if err != nil {
            panic(err)
        }
        err = stmt.SetTags(param.NewParam(1).AddJson([]byte(`{"tb":1}`)), param.NewColumnType(1).AddJson(0))
        if err != nil {
            panic(err)
        }
        params := []*param.Param{
            param.NewParam(3).AddTimestamp(now, 0).AddTimestamp(now.Add(time.Second), 0).AddTimestamp(now.Add(time.Second*2), 0),
            param.NewParam(3).AddBool(true).AddNull().AddBool(true),
            param.NewParam(3).AddTinyint(1).AddNull().AddTinyint(1),
            param.NewParam(3).AddSmallint(1).AddNull().AddSmallint(1),
            param.NewParam(3).AddInt(1).AddNull().AddInt(1),
            param.NewParam(3).AddBigint(1).AddNull().AddBigint(1),
            param.NewParam(3).AddUTinyint(1).AddNull().AddUTinyint(1),
            param.NewParam(3).AddUSmallint(1).AddNull().AddUSmallint(1),
            param.NewParam(3).AddUInt(1).AddNull().AddUInt(1),
            param.NewParam(3).AddUBigint(1).AddNull().AddUBigint(1),
            param.NewParam(3).AddFloat(1).AddNull().AddFloat(1),
            param.NewParam(3).AddDouble(1).AddNull().AddDouble(1),
            param.NewParam(3).AddBinary([]byte("test_binary")).AddNull().AddBinary([]byte("test_binary")),
            param.NewParam(3).AddNchar("test_nchar").AddNull().AddNchar("test_nchar"),
        }
        paramTypes := param.NewColumnType(14).
            AddTimestamp().
            AddBool().
            AddTinyint().
            AddSmallint().
            AddInt().
            AddBigint().
            AddUTinyint().
            AddUSmallint().
            AddUInt().
            AddUBigint().
            AddFloat().
            AddDouble().
            AddBinary(0).
            AddNchar(0)
        err = stmt.BindParam(params, paramTypes)
        if err != nil {
            panic(err)
        }
        err = stmt.AddBatch()
        if err != nil {
            panic(err)
        }
        err = stmt.Exec()
        if err != nil {
            panic(err)
        }
        affected := stmt.GetAffectedRows()
        fmt.Println("all_json affected rows:", affected)
        err = stmt.Close()
        if err != nil {
            panic(err)
        }
    }
    {
        stmt, err := connector.Init()
        if err != nil {
            panic(err)
        }
        err = stmt.Prepare("insert into ? using all_all tags(?,?,?,?,?,?,?,?,?,?,?,?,?,?) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
        err = stmt.SetTableName("tb1")
        if err != nil {
            panic(err)
        }

        err = stmt.SetTableName("tb2")
        if err != nil {
            panic(err)
        }
        err = stmt.SetTags(
            param.NewParam(14).
                AddTimestamp(now, 0).
                AddBool(true).
                AddTinyint(2).
                AddSmallint(2).
                AddInt(2).
                AddBigint(2).
                AddUTinyint(2).
                AddUSmallint(2).
                AddUInt(2).
                AddUBigint(2).
                AddFloat(2).
                AddDouble(2).
                AddBinary([]byte("tb2")).
                AddNchar("tb2"),
            param.NewColumnType(14).
                AddTimestamp().
                AddBool().
                AddTinyint().
                AddSmallint().
                AddInt().
                AddBigint().
                AddUTinyint().
                AddUSmallint().
                AddUInt().
                AddUBigint().
                AddFloat().
                AddDouble().
                AddBinary(0).
                AddNchar(0),
        )
        if err != nil {
            panic(err)
        }
        params := []*param.Param{
            param.NewParam(3).AddTimestamp(now, 0).AddTimestamp(now.Add(time.Second), 0).AddTimestamp(now.Add(time.Second*2), 0),
            param.NewParam(3).AddBool(true).AddNull().AddBool(true),
            param.NewParam(3).AddTinyint(1).AddNull().AddTinyint(1),
            param.NewParam(3).AddSmallint(1).AddNull().AddSmallint(1),
            param.NewParam(3).AddInt(1).AddNull().AddInt(1),
            param.NewParam(3).AddBigint(1).AddNull().AddBigint(1),
            param.NewParam(3).AddUTinyint(1).AddNull().AddUTinyint(1),
            param.NewParam(3).AddUSmallint(1).AddNull().AddUSmallint(1),
            param.NewParam(3).AddUInt(1).AddNull().AddUInt(1),
            param.NewParam(3).AddUBigint(1).AddNull().AddUBigint(1),
            param.NewParam(3).AddFloat(1).AddNull().AddFloat(1),
            param.NewParam(3).AddDouble(1).AddNull().AddDouble(1),
            param.NewParam(3).AddBinary([]byte("test_binary")).AddNull().AddBinary([]byte("test_binary")),
            param.NewParam(3).AddNchar("test_nchar").AddNull().AddNchar("test_nchar"),
        }
        paramTypes := param.NewColumnType(14).
            AddTimestamp().
            AddBool().
            AddTinyint().
            AddSmallint().
            AddInt().
            AddBigint().
            AddUTinyint().
            AddUSmallint().
            AddUInt().
            AddUBigint().
            AddFloat().
            AddDouble().
            AddBinary(0).
            AddNchar(0)
        err = stmt.BindParam(params, paramTypes)
        if err != nil {
            panic(err)
        }
        err = stmt.AddBatch()
        if err != nil {
            panic(err)
        }
        err = stmt.Exec()
        if err != nil {
            panic(err)
        }
        affected := stmt.GetAffectedRows()
        fmt.Println("all_all affected rows:", affected)
        err = stmt.Close()
        if err != nil {
            panic(err)
        }

    }
}

func prepareEnv(db *sql.DB) {
    steps := []string{
        "create database example_ws_stmt",
        "create table example_ws_stmt.all_json(ts timestamp," +
            "c1 bool," +
            "c2 tinyint," +
            "c3 smallint," +
            "c4 int," +
            "c5 bigint," +
            "c6 tinyint unsigned," +
            "c7 smallint unsigned," +
            "c8 int unsigned," +
            "c9 bigint unsigned," +
            "c10 float," +
            "c11 double," +
            "c12 binary(20)," +
            "c13 nchar(20)" +
            ")" +
            "tags(t json)",
        "create table example_ws_stmt.all_all(" +
            "ts timestamp," +
            "c1 bool," +
            "c2 tinyint," +
            "c3 smallint," +
            "c4 int," +
            "c5 bigint," +
            "c6 tinyint unsigned," +
            "c7 smallint unsigned," +
            "c8 int unsigned," +
            "c9 bigint unsigned," +
            "c10 float," +
            "c11 double," +
            "c12 binary(20)," +
            "c13 nchar(20)" +
            ")" +
            "tags(" +
            "tts timestamp," +
            "tc1 bool," +
            "tc2 tinyint," +
            "tc3 smallint," +
            "tc4 int," +
            "tc5 bigint," +
            "tc6 tinyint unsigned," +
            "tc7 smallint unsigned," +
            "tc8 int unsigned," +
            "tc9 bigint unsigned," +
            "tc10 float," +
            "tc11 double," +
            "tc12 binary(20)," +
            "tc13 nchar(20))",
    }
    for _, step := range steps {
        _, err := db.Exec(step)
        if err != nil {
            panic(err)
        }
    }
}

```

</TabItem>
</Tabs>

### 无模式写入

<Tabs defaultValue="native" groupId="connect">
<TabItem value="native" label="原生连接">

```go
import (
    "fmt"

    "github.com/taosdata/driver-go/v3/af"
)

func main() {
    conn, err := af.Open("localhost", "root", "taosdata", "", 6030)
    if err != nil {
        fmt.Println("fail to connect, err:", err)
    }
    defer conn.Close()
    _, err = conn.Exec("create database if not exists example")
    if err != nil {
        panic(err)
    }
    _, err = conn.Exec("use example")
    if err != nil {
        panic(err)
    }
    influxdbData := "st,t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000"
    err = conn.InfluxDBInsertLines([]string{influxdbData}, "ns")
    if err != nil {
        panic(err)
    }
    telnetData := "stb0_0 1626006833 4 host=host0 interface=eth0"
    err = conn.OpenTSDBInsertTelnetLines([]string{telnetData})
    if err != nil {
        panic(err)
    }
    jsonData := "{\"metric\": \"meter_current\",\"timestamp\": 1626846400,\"value\": 10.3, \"tags\": {\"groupid\": 2, \"location\": \"California.SanFrancisco\", \"id\": \"d1001\"}}"
    err = conn.OpenTSDBInsertJsonPayload(jsonData)
    if err != nil {
        panic(err)
    }
}    
```

</TabItem>
<TabItem value="WebSocket" label="WebSocket 连接">

```go
import (
    "database/sql"
    "log"
    "time"

    "github.com/taosdata/driver-go/v3/common"
    _ "github.com/taosdata/driver-go/v3/taosWS"
    "github.com/taosdata/driver-go/v3/ws/schemaless"
)

func main() {
    db, err := sql.Open("taosWS", "root:taosdata@ws(localhost:6041)/")
    if err != nil {
        log.Fatal(err)
    }
    defer db.Close()
    _, err = db.Exec("create database if not exists schemaless_ws")
    if err != nil {
        log.Fatal(err)
    }
    s, err := schemaless.NewSchemaless(schemaless.NewConfig("ws://localhost:6041/rest/schemaless", 1,
        schemaless.SetDb("schemaless_ws"),
        schemaless.SetReadTimeout(10*time.Second),
        schemaless.SetWriteTimeout(10*time.Second),
        schemaless.SetUser("root"),
        schemaless.SetPassword("taosdata"),
        schemaless.SetErrorHandler(func(err error) {
            log.Fatal(err)
        }),
    ))
    if err != nil {
        panic(err)
    }
    influxdbData := "st,t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000"
    telnetData := "stb0_0 1626006833 4 host=host0 interface=eth0"
    jsonData := "{\"metric\": \"meter_current\",\"timestamp\": 1626846400,\"value\": 10.3, \"tags\": {\"groupid\": 2, \"location\": \"California.SanFrancisco\", \"id\": \"d1001\"}}"

    err = s.Insert(influxdbData, schemaless.InfluxDBLineProtocol, "ns", 0, common.GetReqID())
    if err != nil {
        panic(err)
    }
    err = s.Insert(telnetData, schemaless.OpenTSDBTelnetLineProtocol, "ms", 0, common.GetReqID())
    if err != nil {
        panic(err)
    }
    err = s.Insert(jsonData, schemaless.OpenTSDBJsonFormatProtocol, "ms", 0, common.GetReqID())
    if err != nil {
        panic(err)
    }
}
```

</TabItem>
</Tabs>

### 执行带有 reqId 的无模式写入

```go
func (s *Schemaless) Insert(lines string, protocol int, precision string, ttl int, reqID int64) error
```

可以通过 `common.GetReqID()` 获取唯一 id。

### 数据订阅

TDengine Go 连接器支持订阅功能，应用 API 如下：

#### 创建 Topic

```go
    db, err := af.Open("", "root", "taosdata", "", 0)
    if err != nil {
        panic(err)
    }
    defer db.Close()
    _, err = db.Exec("create database if not exists example_tmq WAL_RETENTION_PERIOD 86400")
    if err != nil {
        panic(err)
    }
    _, err = db.Exec("create topic if not exists example_tmq_topic as DATABASE example_tmq")
    if err != nil {
        panic(err)
    }
```

#### 创建 Consumer

```go
    consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{
        "group.id":                     "test",
        "auto.offset.reset":            "latest",
        "td.connect.ip":                "127.0.0.1",
        "td.connect.user":              "root",
        "td.connect.pass":              "taosdata",
        "td.connect.port":              "6030",
        "client.id":                    "test_tmq_client",
        "enable.auto.commit":           "false",
        "msg.with.table.name":          "true",
    })
    if err != nil {
        panic(err)
    }
```

#### 订阅消费数据

```go
    err = consumer.Subscribe("example_tmq_topic", nil)
    if err != nil {
        panic(err)
    }
    for i := 0; i < 5; i++ {
        ev := consumer.Poll(500)
        if ev != nil {
            switch e := ev.(type) {
            case *tmqcommon.DataMessage:
                fmt.Printf("get message:%v\n", e)
            case tmqcommon.Error:
                fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
                panic(e)
            }
            consumer.Commit()
        }
    }
```

#### 指定订阅 Offset

```go
    partitions, err := consumer.Assignment()
    if err != nil {
        panic(err)
    }
    for i := 0; i < len(partitions); i++ {
        fmt.Println(partitions[i])
        err = consumer.Seek(tmqcommon.TopicPartition{
            Topic:     partitions[i].Topic,
            Partition: partitions[i].Partition,
            Offset:    0,
        }, 0)
        if err != nil {
            panic(err)
        }
    }
```

#### 关闭订阅

```go
    err = consumer.Close()
    if err != nil {
        panic(err)
    }
```

#### 完整示例

<Tabs defaultValue="native" groupId="connect">
<TabItem value="native" label="原生连接">

```go
package main

import (
    "fmt"
    "os"
    "time"

    "github.com/taosdata/driver-go/v3/af"
    "github.com/taosdata/driver-go/v3/af/tmq"
    tmqcommon "github.com/taosdata/driver-go/v3/common/tmq"
)

func main() {
    db, err := af.Open("", "root", "taosdata", "", 0)
    if err != nil {
        panic(err)
    }
    defer db.Close()
    _, err = db.Exec("create database if not exists example_tmq WAL_RETENTION_PERIOD 86400")
    if err != nil {
        panic(err)
    }
    _, err = db.Exec("create topic if not exists example_tmq_topic as DATABASE example_tmq")
    if err != nil {
        panic(err)
    }
    consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{
        "group.id":            "test",
        "auto.offset.reset":   "latest",
        "td.connect.ip":       "127.0.0.1",
        "td.connect.user":     "root",
        "td.connect.pass":     "taosdata",
        "td.connect.port":     "6030",
        "client.id":           "test_tmq_client",
        "enable.auto.commit":  "false",
        "msg.with.table.name": "true",
    })
    if err != nil {
        panic(err)
    }
    err = consumer.Subscribe("example_tmq_topic", nil)
    if err != nil {
        panic(err)
    }
    _, err = db.Exec("create table example_tmq.t1 (ts timestamp,v int)")
    if err != nil {
        panic(err)
    }
    go func() {
        for {
            _, err = db.Exec("insert into example_tmq.t1 values(now,1)")
            if err != nil {
                panic(err)
            }
            time.Sleep(time.Millisecond * 100)
        }
    }()

    for i := 0; i < 5; i++ {
        ev := consumer.Poll(500)
        if ev != nil {
            switch e := ev.(type) {
            case *tmqcommon.DataMessage:
                fmt.Printf("get message:%v\n", e)
            case tmqcommon.Error:
                fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
                panic(e)
            }
            consumer.Commit()
        }
    }
    partitions, err := consumer.Assignment()
    if err != nil {
        panic(err)
    }
    for i := 0; i < len(partitions); i++ {
        fmt.Println(partitions[i])
        err = consumer.Seek(tmqcommon.TopicPartition{
            Topic:     partitions[i].Topic,
            Partition: partitions[i].Partition,
            Offset:    0,
        }, 0)
        if err != nil {
            panic(err)
        }
    }

    partitions, err = consumer.Assignment()
    if err != nil {
        panic(err)
    }
    for i := 0; i < len(partitions); i++ {
        fmt.Println(partitions[i])
    }

    err = consumer.Close()
    if err != nil {
        panic(err)
    }
}
```

</TabItem>
<TabItem value="WebSocket" label="WebSocket 连接">

```go
package main

import (
    "database/sql"
    "fmt"
    "time"

    "github.com/taosdata/driver-go/v3/common"
    tmqcommon "github.com/taosdata/driver-go/v3/common/tmq"
    _ "github.com/taosdata/driver-go/v3/taosRestful"
    "github.com/taosdata/driver-go/v3/ws/tmq"
)

func main() {
    db, err := sql.Open("taosRestful", "root:taosdata@http(localhost:6041)/")
    if err != nil {
        panic(err)
    }
    defer db.Close()
    prepareEnv(db)
    consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{
        "ws.url":                "ws://127.0.0.1:6041/rest/tmq",
        "ws.message.channelLen": uint(0),
        "ws.message.timeout":    common.DefaultMessageTimeout,
        "ws.message.writeWait":  common.DefaultWriteWait,
        "td.connect.user":       "root",
        "td.connect.pass":       "taosdata",
        "group.id":              "example",
        "client.id":             "example_consumer",
        "auto.offset.reset":     "latest",
    })
    if err != nil {
        panic(err)
    }
    err = consumer.Subscribe("example_ws_tmq_topic", nil)
    if err != nil {
        panic(err)
    }

    _, err = db.Exec("create table example_ws_tmq.t_all(ts timestamp," +
        "c1 bool," +
        "c2 tinyint," +
        "c3 smallint," +
        "c4 int," +
        "c5 bigint," +
        "c6 tinyint unsigned," +
        "c7 smallint unsigned," +
        "c8 int unsigned," +
        "c9 bigint unsigned," +
        "c10 float," +
        "c11 double," +
        "c12 binary(20)," +
        "c13 nchar(20)" +
        ")")
    if err != nil {
        panic(err)
    }
    go func() {
        for {
            _, err = db.Exec("insert into example_ws_tmq.t_all values(now,true,2,3,4,5,6,7,8,9,10.123,11.123,'binary','nchar')")
            if err != nil {
                panic(err)
            }
            time.Sleep(time.Millisecond * 100)
        }

    }()
    for i := 0; i < 5; i++ {
        ev := consumer.Poll(500)
        if ev != nil {
            switch e := ev.(type) {
            case *tmqcommon.DataMessage:
                fmt.Printf("get message:%v\n", e)
            case tmqcommon.Error:
                fmt.Printf("%% Error: %v: %v\n", e.Code(), e)
                panic(e)
            }
            consumer.Commit()
        }
    }
    partitions, err := consumer.Assignment()
    if err != nil {
        panic(err)
    }
    for i := 0; i < len(partitions); i++ {
        fmt.Println(partitions[i])
        err = consumer.Seek(tmqcommon.TopicPartition{
            Topic:     partitions[i].Topic,
            Partition: partitions[i].Partition,
            Offset:    0,
        }, 0)
        if err != nil {
            panic(err)
        }
    }

    partitions, err = consumer.Assignment()
    if err != nil {
        panic(err)
    }
    for i := 0; i < len(partitions); i++ {
        fmt.Println(partitions[i])
    }

    err = consumer.Close()
    if err != nil {
        panic(err)
    }
}

func prepareEnv(db *sql.DB) {
    _, err := db.Exec("create database example_ws_tmq WAL_RETENTION_PERIOD 86400")
    if err != nil {
        panic(err)
    }
    _, err = db.Exec("create topic example_ws_tmq_topic as database example_ws_tmq")
    if err != nil {
        panic(err)
    }
}
```

</TabItem>
</Tabs>

### 更多示例程序

* [示例程序](https://github.com/taosdata/driver-go/tree/3.0/examples)
* [视频教程](https://www.taosdata.com/blog/2020/11/11/1951.html)。

## 常见问题

1. database/sql 中 stmt（参数绑定）相关接口崩溃

   REST 不支持参数绑定相关接口，建议使用`db.Exec`和`db.Query`。

2. 使用 `use db` 语句后执行其他语句报错 `[0x217] Database not specified or available`

   在 REST 接口中 SQL 语句的执行无上下文关联，使用 `use db` 语句不会生效，解决办法见上方使用限制章节。

3. 使用 taosSql 不报错使用 taosRestful 报错 `[0x217] Database not specified or available`

   因为 REST 接口无状态，使用 `use db` 语句不会生效，解决办法见上方使用限制章节。

4. `readBufferSize` 参数调大后无明显效果

   `readBufferSize` 调大后会减少获取结果时 `syscall` 的调用。如果查询结果的数据量不大，修改该参数不会带来明显提升，如果该参数修改过大，瓶颈会在解析 JSON 数据。如果需要优化查询速度，需要根据实际情况调整该值来达到查询效果最优。

5. `disableCompression` 参数设置为 `false` 时查询效率降低

   当 `disableCompression` 参数设置为 `false` 时查询结果会使用 `gzip` 压缩后传输，拿到数据后要先进行 `gzip` 解压。

6. `go get` 命令无法获取包，或者获取包超时

  设置 Go 代理 `go env -w GOPROXY=https://goproxy.cn,direct`。

## API 参考

全部 API 见 [driver-go 文档](https://pkg.go.dev/github.com/taosdata/driver-go/v3)
